/**
 * Copyright (c) 2021 OceanBase
 * OceanBase CE is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *          http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */

#define USING_LOG_PREFIX SHARE_SCHEMA
#include "ob_synonym_mgr.h"
#include "lib/allocator/ob_mod_define.h"
#include "lib/oblog/ob_log.h"
#include "lib/oblog/ob_log_module.h"
#include "share/schema/ob_schema_utils.h"

namespace oceanbase {
namespace share {
namespace schema {
using namespace std;
using namespace common;
using namespace common::hash;

namespace synonym_mgr {

//////////////////////////////////////////////////////////////////////////////////
struct Del_Schema_In_Tenant_Filter {
  Del_Schema_In_Tenant_Filter(uint64_t tenant_id) : tenant_id_(tenant_id)
  {}
  bool operator()(ObSimpleSynonymSchema* value)
  {
    return value->get_tenant_id() == tenant_id_;
  }
  uint64_t tenant_id_;
};
struct Del_Schema_In_Tenant_Action {
  Del_Schema_In_Tenant_Action()
  {}
  int operator()(ObSimpleSynonymSchema* value, ObSynonymMgr::SynonymInfos& infos, ObSynonymMgr::ObSynonymMap& map)
  {
    int ret = OB_SUCCESS;
    ObSimpleSynonymSchema* value_to_del = NULL;
    if (OB_ISNULL(value)) {
      ret = OB_ERR_UNEXPECTED;
      LOG_WARN("invalid argument, delete value is null", K(value), K(ret));
    } else if (OB_FAIL(
                   infos.remove_if(value, ObSynonymMgr::compare_synonym, ObSynonymMgr::equal_synonym, value_to_del))) {
      LOG_WARN("fail to remove synonym schema", K(*value), K(ret));
    } else if (OB_ISNULL(value_to_del)) {
      ret = OB_ERR_UNEXPECTED;
      LOG_WARN("removed synonym schema return NULL", K(*value), K(value_to_del), K(ret));
    } else {
      ObSynonymHashWrapper hash_wrapper(value->get_tenant_id(), value->get_database_id(), value->get_synonym_name());
      if (OB_FAIL(map.erase_refactored(hash_wrapper))) {
        LOG_WARN("fail to delete synonym schema from hashmap", K(*value), K(ret));
      }
    }

    if (infos.count() != map.item_count()) {
      LOG_WARN("synonym info is non-consistent",
          "synonym_infos_count",
          infos.count(),
          "synonym_map_item_count",
          map.item_count(),
          "synonym_id",
          value->get_synonym_id(),
          "synonym_name",
          value->get_synonym_name());
      int tmp_ret = OB_SUCCESS;
      if (OB_SUCCESS != (tmp_ret = ObSynonymMgr::rebuild_synonym_hashmap(infos, map))) {
        LOG_WARN("rebuild synonym hashmap failed", K(tmp_ret));
      }
    }

    return ret;
  }
};
struct Del_Schema_In_Tenant_EarlyStopCondition {
  Del_Schema_In_Tenant_EarlyStopCondition()
  {}
  bool operator()()
  {
    return false;
  }
};
//////////////////////////////////////////////////////////////////////////////////
struct Del_Schema_Filter {
  Del_Schema_Filter(const ObTenantSynonymId& synonym) : synonym_(synonym)
  {}
  bool operator()(ObSimpleSynonymSchema* value)
  {
    return value->get_tenant_id() == synonym_.tenant_id_ && value->get_synonym_id() == synonym_.synonym_id_;
  }
  const ObTenantSynonymId& synonym_;
};
//////////////////////////////////////////////////////////////////////////////////
struct Get_Synonym_Schema_Filter {
  Get_Synonym_Schema_Filter(uint64_t synonym_id) : synonym_id_(synonym_id)
  {}
  bool operator()(ObSimpleSynonymSchema* value)
  {
    return value->get_synonym_id() == synonym_id_;
  }
  uint64_t synonym_id_;
};
struct Get_Synonym_Schema_Action {
  Get_Synonym_Schema_Action(const ObSimpleSynonymSchema*& synonym_schema) : synonym_schema_(synonym_schema)
  {}
  int operator()(ObSimpleSynonymSchema* value, ObSynonymMgr::SynonymInfos& infos, ObSynonymMgr::ObSynonymMap& map)
  {
    UNUSED(infos);
    UNUSED(map);
    synonym_schema_ = value;
    return OB_SUCCESS;
  }
  const ObSimpleSynonymSchema*& synonym_schema_;
};
struct Get_Synonym_Schema_EarlyStopCondition {
  Get_Synonym_Schema_EarlyStopCondition(const ObSimpleSynonymSchema*& synonym_schema) : synonym_schema_(synonym_schema)
  {}
  bool operator()()
  {
    return synonym_schema_ != NULL;
  }
  const ObSimpleSynonymSchema*& synonym_schema_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
struct Get_Synonym_Filter {
  Get_Synonym_Filter(uint64_t db_id, uint64_t tenant_id) : db_id_(db_id), tenant_id_(tenant_id)
  {}
  bool operator()(ObSimpleSynonymSchema* value)
  {
    return value->get_database_id() == db_id_ && value->get_tenant_id() == tenant_id_;
  }
  uint64_t db_id_;
  uint64_t tenant_id_;
};
struct Get_Synonym_Action {
  Get_Synonym_Action(ObIArray<const ObSimpleSynonymSchema*>& synonym_schemas) : synonym_schemas_(synonym_schemas)
  {}
  int operator()(ObSimpleSynonymSchema* value, ObSynonymMgr::SynonymInfos& infos, ObSynonymMgr::ObSynonymMap& map)
  {
    int ret = OB_SUCCESS;
    if (OB_FAIL(synonym_schemas_.push_back(value))) {
      LOG_WARN("push back failed", K(ret), K(value->get_synonym_name_str()));
    }
    UNUSED(infos);
    UNUSED(map);
    return ret;
  }
  ObIArray<const ObSimpleSynonymSchema*>& synonym_schemas_;
};
struct Get_Synonym_EarlyStopCondition {
  bool operator()()
  {
    return true;
  }
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
struct Deep_Copy_Filter {
  Deep_Copy_Filter()
  {}
  bool operator()(ObSimpleSynonymSchema* value)
  {
    UNUSED(value);
    return true;
  }
};
struct Deep_Copy_Action {
  Deep_Copy_Action(ObSynonymMgr& synonym_mgr) : synonym_mgr_(synonym_mgr)
  {}
  int operator()(ObSimpleSynonymSchema* value, ObSynonymMgr::SynonymInfos& infos, ObSynonymMgr::ObSynonymMap& map)
  {
    int ret = OB_SUCCESS;
    if (OB_ISNULL(value)) {
      ret = OB_ERR_UNEXPECTED;
      LOG_WARN("value is NULL", K(value), K(ret));
    } else if (OB_FAIL(synonym_mgr_.add_synonym(*value))) {
      LOG_WARN("push back failed", K(ret), K(value->get_synonym_name_str()));
    }
    UNUSED(infos);
    UNUSED(map);
    return ret;
  }
  ObSynonymMgr& synonym_mgr_;
};
struct Deep_Copy_EarlyStopCondition {
  bool operator()()
  {
    return false;
  }
};
}  // namespace synonym_mgr
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
ObSimpleSynonymSchema::ObSimpleSynonymSchema() : ObSchema()
{
  reset();
}

ObSimpleSynonymSchema::ObSimpleSynonymSchema(ObIAllocator* allocator) : ObSchema(allocator)
{
  reset();
}

ObSimpleSynonymSchema::ObSimpleSynonymSchema(const ObSimpleSynonymSchema& other) : ObSchema()
{
  reset();
  *this = other;
}

ObSimpleSynonymSchema::~ObSimpleSynonymSchema()
{}

ObSimpleSynonymSchema& ObSimpleSynonymSchema::operator=(const ObSimpleSynonymSchema& other)
{
  if (this != &other) {
    reset();
    int ret = OB_SUCCESS;
    error_ret_ = other.error_ret_;
    tenant_id_ = other.tenant_id_;
    synonym_id_ = other.synonym_id_;
    schema_version_ = other.schema_version_;
    database_id_ = other.database_id_;
    object_database_id_ = other.object_database_id_;
    if (OB_FAIL(deep_copy_str(other.synonym_name_, synonym_name_))) {
      LOG_WARN("Fail to deep copy synonym name", K(ret));
    } else if (OB_FAIL(deep_copy_str(other.object_name_, object_name_))) {
      LOG_WARN("Fail to deep copy signature", K(ret));
    }
    if (OB_FAIL(ret)) {
      error_ret_ = ret;
    }
  }

  return *this;
}

void ObSimpleSynonymSchema::reset()
{
  ObSchema::reset();
  tenant_id_ = OB_INVALID_ID;
  synonym_id_ = OB_INVALID_ID;
  schema_version_ = OB_INVALID_VERSION;
  database_id_ = OB_INVALID_ID;
  database_id_ = OB_INVALID_ID;
  object_database_id_ = OB_INVALID_ID;
  synonym_name_.reset();
  object_name_.reset();
}

bool ObSimpleSynonymSchema::is_valid() const
{
  bool ret = true;
  if (OB_INVALID_ID == tenant_id_ || OB_INVALID_ID == synonym_id_ || schema_version_ < 0 ||
      OB_INVALID_ID == database_id_ || OB_INVALID_ID == object_database_id_ || synonym_name_.empty() ||
      object_name_.empty()) {
    ret = false;
  }
  return ret;
}

int64_t ObSimpleSynonymSchema::get_convert_size() const
{
  int64_t convert_size = 0;

  convert_size += sizeof(ObSimpleSynonymSchema);
  convert_size += synonym_name_.length() + 1;
  convert_size += object_name_.length() + 1;

  return convert_size;
}

ObSynonymMgr::ObSynonymMgr()
    : is_inited_(false),
      local_allocator_(ObModIds::OB_SCHEMA_GETTER_GUARD),
      allocator_(local_allocator_),
      synonym_infos_(0, NULL, ObModIds::OB_SCHEMA_SYNONYM),
      synonym_map_(ObModIds::OB_SCHEMA_SYNONYM)
{}

ObSynonymMgr::ObSynonymMgr(ObIAllocator& allocator)
    : is_inited_(false),
      local_allocator_(ObModIds::OB_SCHEMA_GETTER_GUARD),
      allocator_(allocator),
      synonym_infos_(0, NULL, ObModIds::OB_SCHEMA_SYNONYM),
      synonym_map_(ObModIds::OB_SCHEMA_SYNONYM)
{}

ObSynonymMgr::~ObSynonymMgr()
{}

int ObSynonymMgr::init()
{
  int ret = OB_SUCCESS;
  if (is_inited_) {
    ret = OB_INIT_TWICE;
    LOG_WARN("init private synonym manager twice", K(ret));
  } else if (OB_FAIL(synonym_map_.init())) {
    LOG_WARN("init private synonym map failed", K(ret));
  } else {
    is_inited_ = true;
  }
  return ret;
}

void ObSynonymMgr::reset()
{
  if (!is_inited_) {
    LOG_WARN("synonym manger not init");
  } else {
    synonym_infos_.clear();
    synonym_map_.clear();
  }
}

ObSynonymMgr& ObSynonymMgr::operator=(const ObSynonymMgr& other)
{
  int ret = OB_SUCCESS;
  if (!is_inited_) {
    ret = OB_NOT_INIT;
    LOG_WARN("synonym manager not init", K(ret));
  } else if (OB_FAIL(assign(other))) {
    LOG_WARN("assign failed", K(ret));
  }
  return *this;
}

int ObSynonymMgr::assign(const ObSynonymMgr& other)
{
  int ret = OB_SUCCESS;
  if (!is_inited_) {
    ret = OB_NOT_INIT;
    LOG_WARN("synonym manager not init", K(ret));
  } else if (this != &other) {
    if (OB_FAIL(synonym_map_.assign(other.synonym_map_))) {
      LOG_WARN("assign synonym map failed", K(ret));
    } else if (OB_FAIL(synonym_infos_.assign(other.synonym_infos_))) {
      LOG_WARN("assign synonym infos vector failed", K(ret));
    }
  }
  return ret;
}

int ObSynonymMgr::deep_copy(const ObSynonymMgr& other)
{
  int ret = OB_SUCCESS;
  if (!is_inited_) {
    ret = OB_NOT_INIT;
    LOG_WARN("synonym manager not init", K(ret));
  } else if (this != &other) {
    reset();
    synonym_mgr::Deep_Copy_Filter filter;
    synonym_mgr::Deep_Copy_Action action(*this);
    synonym_mgr::Deep_Copy_EarlyStopCondition condition;
    if (OB_FAIL((const_cast<ObSynonymMgr&>(other)).for_each(filter, action, condition))) {
      LOG_WARN("deep copy failed", K(ret));
    }
  }
  return ret;
}

int ObSynonymMgr::add_synonym(const ObSimpleSynonymSchema& synonym_schema)
{
  int ret = OB_SUCCESS;
  int overwrite = 1;
  ObSimpleSynonymSchema* new_synonym_schema = NULL;
  SynonymIter iter = NULL;
  ObSimpleSynonymSchema* replaced_synonym = NULL;
  if (!is_inited_) {
    ret = OB_NOT_INIT;
    LOG_WARN("synonym manager not init", K(ret));
  } else if (OB_UNLIKELY(!synonym_schema.is_valid())) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid argument", K(ret), K(synonym_schema));
  } else if (OB_FAIL(ObSchemaUtils::alloc_schema(allocator_, synonym_schema, new_synonym_schema))) {
    LOG_WARN("alloca synonym schema failed", K(ret));
  } else if (OB_ISNULL(new_synonym_schema)) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("NULL ptr", K(new_synonym_schema), K(ret));
  } else if (OB_FAIL(
                 synonym_infos_.replace(new_synonym_schema, iter, compare_synonym, equal_synonym, replaced_synonym))) {
    LOG_WARN("failed to add synonym schema", K(ret));
  } else {
    ObSynonymHashWrapper hash_wrapper(new_synonym_schema->get_tenant_id(),
        new_synonym_schema->get_database_id(),
        new_synonym_schema->get_synonym_name());
    if (OB_FAIL(synonym_map_.set_refactored(hash_wrapper, new_synonym_schema, overwrite))) {
      LOG_WARN("build synonym hash map failed", K(ret));
    }
  }
  if (synonym_infos_.count() != synonym_map_.item_count()) {
    LOG_WARN("synonym info is non-consistent",
        "synonym_infos_count",
        synonym_infos_.count(),
        "synonym_map_item_count",
        synonym_map_.item_count(),
        "synonym_id",
        synonym_schema.get_synonym_id(),
        "synonym_name",
        synonym_schema.get_synonym_name());
    int tmp_ret = OB_SUCCESS;
    if (OB_SUCCESS != (tmp_ret = ObSynonymMgr::rebuild_synonym_hashmap(synonym_infos_, synonym_map_))) {
      LOG_WARN("rebuild synonym hashmap failed", K(tmp_ret));
    }
  }
  return ret;
}

int ObSynonymMgr::rebuild_synonym_hashmap(const SynonymInfos& synonym_infos, ObSynonymMap& synonym_map)
{
  int ret = OB_SUCCESS;
  synonym_map.clear();
  ConstSynonymIter iter = synonym_infos.begin();
  for (; iter != synonym_infos.end() && OB_SUCC(ret); ++iter) {
    ObSimpleSynonymSchema* synonym_schema = *iter;
    if (OB_ISNULL(synonym_schema)) {
      ret = OB_ERR_UNEXPECTED;
      LOG_WARN("synonym schema is NULL", K(synonym_schema), K(ret));
    } else {
      bool overwrite = true;
      ObSynonymHashWrapper hash_wrapper(
          synonym_schema->get_tenant_id(), synonym_schema->get_database_id(), synonym_schema->get_synonym_name());
      if (OB_FAIL(synonym_map.set_refactored(hash_wrapper, synonym_schema, overwrite))) {
        LOG_WARN("build synonym hash map failed", K(ret));
      }
    }
  }
  return ret;
}

int ObSynonymMgr::add_synonyms(const common::ObIArray<ObSimpleSynonymSchema>& synonym_schemas)
{
  int ret = OB_SUCCESS;
  for (int64_t i = 0; i < synonym_schemas.count() && OB_SUCC(ret); ++i) {
    if (OB_FAIL(add_synonym(synonym_schemas.at(i)))) {
      LOG_WARN("push synonym failed", K(ret));
    }
  }
  return ret;
}

int ObSynonymMgr::get_synonym_schema_with_name(const uint64_t tenant_id, const uint64_t database_id,
    const ObString& name, const ObSimpleSynonymSchema*& synonym_schema) const
{
  int ret = OB_SUCCESS;
  synonym_schema = NULL;
  if (!is_inited_) {
    ret = OB_NOT_INIT;
    LOG_WARN("not init", K(ret));
  } else if (OB_INVALID_ID == tenant_id || OB_INVALID_ID == database_id || name.empty()) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid argument", K(ret), K(tenant_id), K(database_id), K(name));
  } else {
    ObSimpleSynonymSchema* tmp_schema = NULL;
    ObSynonymHashWrapper hash_wrap(tenant_id, database_id, name);
    if (OB_FAIL(synonym_map_.get_refactored(hash_wrap, tmp_schema))) {
      if (OB_HASH_NOT_EXIST == ret) {
        ret = OB_SUCCESS;
        LOG_INFO("synonym is not exist", K(tenant_id), K(database_id), K(name));
      }
    } else {
      synonym_schema = tmp_schema;
    }
  }
  return ret;
}

int ObSynonymMgr::get_synonym_info_version(uint64_t synonym_id, int64_t& synonym_version) const
{
  int ret = OB_SUCCESS;
  synonym_version = OB_INVALID_ID;
  ConstSynonymIter iter = synonym_infos_.begin();
  for (; OB_SUCC(ret) && iter != synonym_infos_.end(); ++iter) {
    const ObSimpleSynonymSchema* synonym = NULL;
    if (OB_ISNULL(synonym = *iter)) {
      ret = OB_ERR_UNEXPECTED;
      LOG_WARN("NULL ptr", K(ret));
    } else if (synonym->get_synonym_id() == synonym_id) {
      synonym_version = synonym->get_schema_version();
      break;
    }
  }
  return ret;
}

// suppose there are no vital errors ,
// this function will always return ob_success no matter do_exist = true or not.
int ObSynonymMgr::get_object(const uint64_t tenant_id, const uint64_t database_id, const ObString& name,
    uint64_t& obj_database_id, uint64_t& synonym_id, ObString& obj_table_name, bool& do_exist) const
{
  int ret = OB_SUCCESS;
  do_exist = false;
  const ObSimpleSynonymSchema* synonym_schema = nullptr;
  if (!is_inited_) {
    ret = OB_NOT_INIT;
    LOG_WARN("not init", K(ret));
  } else if (OB_INVALID_ID == tenant_id || OB_INVALID_ID == database_id || name.empty()) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid argument", K(ret), K(tenant_id), K(database_id), K(name));
  }

  for (int64_t i = 0; OB_SUCC(ret) && false == do_exist && i < 2; ++i) {
    uint64_t cur_database_id = (i == 0 ? database_id : combine_id(tenant_id, OB_PUBLIC_SCHEMA_ID));
    if (OB_FAIL(get_synonym_schema_with_name(tenant_id, cur_database_id, name, synonym_schema))) {
      LOG_WARN("fail to get synonym schema with name", K(tenant_id), K(cur_database_id), K(name), K(ret));
    } else if (OB_ISNULL(synonym_schema)) {
      LOG_INFO("synonym is not exist", K(tenant_id), K(cur_database_id), K(name), K(ret));
    } else {
      do_exist = true;
    }
  }

  if (OB_SUCC(ret) && synonym_schema != NULL) {
    do_exist = true;
    obj_database_id = synonym_schema->get_object_database_id();
    obj_table_name = synonym_schema->get_object_name_str();
    synonym_id = synonym_schema->get_synonym_id();
  }
  return ret;
}

int ObSynonymMgr::del_synonym(const ObTenantSynonymId& synonym)
{
  int ret = OB_SUCCESS;
  int hash_ret = OB_SUCCESS;
  ObSimpleSynonymSchema* schema_to_del = NULL;
  if (!synonym.is_valid()) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid argument", K(ret), K(synonym));
  } else if (OB_FAIL(synonym_infos_.remove_if(
                 synonym, compare_with_tenant_synonym_id, equal_to_tenant_synonym_id, schema_to_del))) {
    LOG_WARN("failed to remove synonym schema, ",
        "tenant_id",
        synonym.tenant_id_,
        "synonym_id",
        synonym.synonym_id_,
        K(ret));
  } else if (OB_ISNULL(schema_to_del)) {
    ret = OB_ERR_UNEXPECTED;
    LOG_WARN("removed synonym schema return NULL, ",
        "tenant_id",
        synonym.tenant_id_,
        "synonym_id",
        synonym.synonym_id_,
        K(ret));
  } else {
    ObSynonymHashWrapper synonym_wrapper(
        schema_to_del->get_tenant_id(), schema_to_del->get_database_id(), schema_to_del->get_synonym_name());
    hash_ret = synonym_map_.erase_refactored(synonym_wrapper);
    if (OB_SUCCESS != hash_ret) {
      ret = OB_ERR_UNEXPECTED;
      LOG_WARN("failed delete synonym from synonym hashmap, ",
          K(ret),
          K(hash_ret),
          "tenant_id",
          schema_to_del->get_tenant_id(),
          "database_id",
          schema_to_del->get_database_id(),
          "name",
          schema_to_del->get_synonym_name());
    }
  }

  if (synonym_infos_.count() != synonym_map_.item_count()) {
    LOG_WARN("synonym info is non-consistent",
        "synonym_infos_count",
        synonym_infos_.count(),
        "synonym_map_item_count",
        synonym_map_.item_count(),
        "synonym_id",
        synonym.synonym_id_);
    int tmp_ret = OB_SUCCESS;
    if (OB_SUCCESS != (tmp_ret = ObSynonymMgr::rebuild_synonym_hashmap(synonym_infos_, synonym_map_))) {
      LOG_WARN("rebuild synonym hashmap failed", K(tmp_ret));
    }
  }
  return ret;
}

int ObSynonymMgr::get_synonym_schemas_in_database(
    const uint64_t tenant_id, const uint64_t database_id, ObIArray<const ObSimpleSynonymSchema*>& synonym_schemas) const
{
  int ret = OB_SUCCESS;
  synonym_mgr::Get_Synonym_Filter filter(database_id, tenant_id);
  synonym_mgr::Get_Synonym_Action action(synonym_schemas);
  synonym_mgr::Get_Synonym_EarlyStopCondition condition;
  if (OB_FAIL((const_cast<ObSynonymMgr&>(*this)).for_each(filter, action, condition))) {
    LOG_WARN("get schema failed", K(ret));
  }
  return ret;
}

int ObSynonymMgr::del_schemas_in_tenant(const uint64_t tenant_id)
{
  int ret = OB_SUCCESS;
  if (OB_INVALID_ID == tenant_id) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid argument", K(ret), K(tenant_id));
  } else {
    ObArray<const ObSimpleSynonymSchema*> schemas;
    if (OB_FAIL(get_synonym_schemas_in_tenant(tenant_id, schemas))) {
      LOG_WARN("get synonym schemas failed", K(ret), K(tenant_id));
    } else {
      FOREACH_CNT_X(schema, schemas, OB_SUCC(ret))
      {
        ObTenantSynonymId tenant_synonym_id(tenant_id, (*schema)->get_synonym_id());
        if (OB_FAIL(del_synonym(tenant_synonym_id))) {
          LOG_WARN("del synonym failed",
              "tenant_id",
              tenant_synonym_id.tenant_id_,
              "synonym_id",
              tenant_synonym_id.synonym_id_,
              K(ret));
        }
      }
    }
  }
  return ret;
}

int ObSynonymMgr::get_synonym_schema(const uint64_t synonym_id, const ObSimpleSynonymSchema*& synonym_schema) const
{
  int ret = OB_SUCCESS;
  synonym_schema = NULL;
  synonym_mgr::Get_Synonym_Schema_Filter filter(synonym_id);
  synonym_mgr::Get_Synonym_Schema_Action action(synonym_schema);
  synonym_mgr::Get_Synonym_Schema_EarlyStopCondition condition(synonym_schema);
  if (OB_FAIL((const_cast<ObSynonymMgr&>(*this)).for_each(filter, action, condition))) {
    LOG_WARN("get synonym schema failed", K(ret));
  }
  return ret;
}

template <typename Filter, typename Acation, typename EarlyStopCondition>
int ObSynonymMgr::for_each(Filter& filter, Acation& action, EarlyStopCondition& condition)
{
  int ret = OB_SUCCESS;
  ConstSynonymIter iter = synonym_infos_.begin();
  for (; iter != synonym_infos_.end() && OB_SUCC(ret); iter++) {
    ObSimpleSynonymSchema* value = NULL;
    if (OB_ISNULL(value = *iter)) {
      ret = OB_ERR_UNEXPECTED;
      LOG_WARN("NULL ptr", K(ret));
    } else {
      if (filter(value)) {
        if (OB_FAIL(action(value, synonym_infos_, synonym_map_))) {
          LOG_WARN("action failed", K(ret));
        }
      }
    }
  }
  UNUSED(condition);
  return ret;
}

bool ObSynonymMgr::compare_with_tenant_synonym_id(
    const ObSimpleSynonymSchema* lhs, const ObTenantSynonymId& tenant_synonym_id)
{
  return NULL != lhs ? (lhs->get_tenant_synonym_id() < tenant_synonym_id) : false;
}

bool ObSynonymMgr::equal_to_tenant_synonym_id(
    const ObSimpleSynonymSchema* lhs, const ObTenantSynonymId& tenant_synonym_id)
{
  return NULL != lhs ? (lhs->get_tenant_synonym_id() == tenant_synonym_id) : false;
}

int ObSynonymMgr::get_synonym_schemas_in_tenant(
    const uint64_t tenant_id, ObIArray<const ObSimpleSynonymSchema*>& synonym_schemas) const
{
  int ret = OB_SUCCESS;
  synonym_schemas.reset();

  ObTenantSynonymId tenant_outine_id_lower(tenant_id, OB_MIN_ID);
  ConstSynonymIter tenant_synonym_begin =
      synonym_infos_.lower_bound(tenant_outine_id_lower, compare_with_tenant_synonym_id);
  bool is_stop = false;
  for (ConstSynonymIter iter = tenant_synonym_begin; OB_SUCC(ret) && iter != synonym_infos_.end() && !is_stop; ++iter) {
    const ObSimpleSynonymSchema* synonym = NULL;
    if (OB_ISNULL(synonym = *iter)) {
      ret = OB_ERR_UNEXPECTED;
      LOG_WARN("NULL ptr", K(ret), K(synonym));
    } else if (tenant_id != synonym->get_tenant_id()) {
      is_stop = true;
    } else if (OB_FAIL(synonym_schemas.push_back(synonym))) {
      LOG_WARN("push back synonym failed", K(ret));
    }
  }

  return ret;
}

int ObSynonymMgr::get_synonym_schema_count(int64_t& synonym_schema_count) const
{
  int ret = OB_SUCCESS;
  if (!is_inited_) {
    LOG_WARN("synonym manger not init");
  } else {
    synonym_schema_count = synonym_infos_.size();
  }
  return ret;
}

int ObSynonymMgr::get_schema_statistics(ObSchemaStatisticsInfo& schema_info) const
{
  int ret = OB_SUCCESS;
  schema_info.reset();
  schema_info.schema_type_ = SYNONYM_SCHEMA;
  if (!is_inited_) {
    ret = OB_NOT_INIT;
    LOG_WARN("not init", K(ret));
  } else {
    schema_info.count_ = synonym_infos_.size();
    for (ConstSynonymIter it = synonym_infos_.begin(); OB_SUCC(ret) && it != synonym_infos_.end(); it++) {
      if (OB_ISNULL(*it)) {
        ret = OB_ERR_UNEXPECTED;
        LOG_WARN("schema is null", K(ret));
      } else {
        schema_info.size_ += (*it)->get_convert_size();
      }
    }
  }
  return ret;
}

}  // end of namespace schema
}  // end of namespace share
}  // end of namespace oceanbase
